package cronjob

import (
	"github.com/DiracLee/dires-go/ds/dict"
	"github.com/DiracLee/dires-go/logger"
	"time"
)

type task struct {
	id     string
	delay  time.Duration
	circle int
	run    func() error
}

type slot = dict.Dict

type cronjob struct {
	interval    time.Duration
	ticker      *time.Ticker
	taskIDToIdx map[string]int
	slotCount   int
	taskSlots   []slot
	currentIdx  int
	chAdd       chan task
	chRemove    chan string
	chStop      chan struct{}
}

func New(options ...Option) *cronjob {
	const (
		defaultInterval         = time.Second
		defaultSlotCount        = 2048
		defaultChAddBuffSize    = 16
		defaultChRemoveBuffSize = 16
	)
	cj := &cronjob{
		interval:    defaultInterval,
		ticker:      time.NewTicker(defaultInterval),
		taskIDToIdx: make(map[string]int),
		slotCount:   defaultSlotCount,
		taskSlots:   make([]slot, defaultSlotCount),
		currentIdx:  0,
		chAdd:       make(chan task, defaultChAddBuffSize),
		chRemove:    make(chan string, defaultChRemoveBuffSize),
		chStop:      make(chan struct{}),
	}
	for i := 0; i < cj.slotCount; i++ {
		cj.taskSlots[i] = dict.NewConcurrent(16)
	}

	for _, option := range options {
		option.Apply(cj)
	}
	return cj
}

func (cj *cronjob) Start() {
	cj.ticker = time.NewTicker(cj.interval)
	go cj.schedule()
}

func (cj *cronjob) Stop() {
	cj.chStop <- struct{}{}
}

func (cj *cronjob) schedule() {
	select {
	case <-cj.ticker.C:
		cj.handleTicker()
	case task := <-cj.chAdd:
		cj.addTask(&task)
	case taskID := <-cj.chRemove:
		cj.removeTask(taskID)
	case <-cj.chStop:
		cj.Stop()
	}
}

func (cj *cronjob) handleTicker() {
	slot := cj.taskSlots[cj.currentIdx]
	cj.consumeSlot(slot)
	cj.currentIdx++
	if cj.currentIdx == cj.slotCount {
		cj.currentIdx = 0
	}
}

func (cj *cronjob) consumeSlot(slot slot) {
	removedIDs := make([]string, 0)
	slot.ForEach(func(id string, val interface{}) bool {
		task, ok := val.(*task)
		if !ok {
			return true
		}
		if task.circle > 0 {
			task.circle--
			return true
		}
		t := *task
		go func() {
			err := t.run()
			if err != nil {
				logger.Warn("task id not found")
				return
			}
		}()
		removedIDs = append(removedIDs, id)
		return true
	})
	for _, id := range removedIDs {
		slot.Remove(id)
		if id != "" {
			delete(cj.taskIDToIdx, id)
		}
	}
}

func (cj *cronjob) addTask(t *task) {
	delaySeconds := int(t.delay.Seconds())
	intervalSeconds := int(cj.interval.Seconds())
	t.circle = delaySeconds / intervalSeconds / cj.slotCount
	index := (cj.currentIdx + delaySeconds/intervalSeconds) % cj.slotCount
	cj.taskSlots[index].PutOrSet(t.id, t)
	if t.id != "" {
		cj.taskIDToIdx[t.id] = index
	}
}

func (cj *cronjob) RegisterDelay(delay time.Duration, taskID string, run func() error) {
	if delay < 0 {
		return
	}
	cj.chAdd <- task{delay: delay, id: taskID, run: run}
}

func (cj *cronjob) removeTask(taskID string) {
	index, ok := cj.taskIDToIdx[taskID]
	if !ok {
		logger.Warn("task id not found")
		return
	}
	slot := cj.taskSlots[index]
	slot.Remove(taskID)
	if taskID != "" {
		delete(cj.taskIDToIdx, taskID)
	}
}

func (cj *cronjob) Cancel(taskID string) {
	if taskID == "" {
		return
	}
	cj.chRemove <- taskID
}
